Elasticsearch scroll 之滚动查询

Posted by xyx on 2025-01-14
Words 914 and Reading Time 4 Minutes
Viewed Times

Elasticsearch scroll 之滚动查询

Elasticsearch 的 Scroll API 是一种用于处理大规模数据集的机制,特别是在需要从索引中检索大量数据时。通常情况下,Elasticsearch 的搜索请求会有一个结果集大小的限制 (from+size 的检索数量默认是 10,000 条记录),而 Scroll API 允许你绕过这个限制,通过滚动的方式逐步获取数据

关键概念

  • Scroll Context(滚动上下文)
    • 当你第一次发起一个滚动请求时,Elasticsearch 会创建一个滚动上下文。这个上下文保存了搜索的状态和位置,以便在后续请求中继续检索数据
    • 滚动上下文是有状态的,它在服务器端保存了一段时间 (由你指定的超时时间决定)
  • Scroll ID(滚动 ID)
    • 每次滚动请求都会返回一个scrollId,这是一个唯一标识符,用于标识和管理滚动上下文
    • 你需要在后续的滚动请求中提供这个scrollId,以便 Elasticsearch 知道从哪里继续检索数据
  • Timeout(超时时间)
    • 你可以为滚动上下文指定一个超时时间,这个时间决定了滚动上下文在服务器端保持活跃的时间
    • 如果在超时时间内没有新的滚动请求,滚动上下文会被自动清除

      工作原理

  • 初始请求
    • 你首先发起一个搜索请求,并指定滚动参数 (如超时时间)。这个请求会返回初始的搜索结果和一个scrollId
  • 后续请求
    • 使用返回的scrollId发起后续的滚动请求。每个请求都会返回一批新的结果和一个新的scrollId
    • 你继续使用新的scrollId进行后续请求,直到没有更多结果返回
  • 清除滚动上下文
    • 当你完成数据检索后,应该显式地清除滚动上下文,以释放服务器资源。这可以通过ClearScrollRequest来实现

Java 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.xxx;

import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.enthusa.avatar.core.utils.DateUtil;
import org.enthusa.avatar.utils.task.TaskModel;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class ESScrollTask extends AbstractTask {

private static final String[] INCLUDE_FIELDS = {"entity_id", "job_name", "job_city", "edu_level", "locations", "company_id", "career_job_id2", "salary"};
private static final String[] EXCLUDE_FIELDS = {};

public static final int ES_EACH_SIZE = 500;
public static final int ES_TOTAL_SIZE = 10000;

@Resource
protected RestHighLevelClient utEsClient;

private List<PlatformJob> termSearchWithScroll(Integer recruitType) {
final long scrollTimeout = 60000;
List<PlatformJob> platformJobs = new ArrayList<>();
try {
SearchRequest searchRequest = new SearchRequest(Constants.ES_JOB_ITEM);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 自定义查询语句
searchSourceBuilder.query(buildQuery(recruitType));
searchSourceBuilder.fetchSource(INCLUDE_FIELDS, EXCLUDE_FIELDS);
searchSourceBuilder.size(ES_EACH_SIZE);
searchRequest.source(searchSourceBuilder);
// 滚动查询及超时时间
searchRequest.scroll(TimeValue.timeValueMillis(scrollTimeout));

SearchResponse searchResponse = utEsClient.search(searchRequest, RequestOptions.DEFAULT);
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();

while (searchHits != null && searchHits.length > 0 && platformJobs.size() < ES_TOTAL_SIZE) {
for (SearchHit hit : searchHits) {
// 自定义业务
PlatformJob platformJob = new PlatformJob();
platformJob.setJobId(IdMapping.toId((Long) hit.getSourceAsMap().get("entity_id")));
platformJob.setJobName((String) hit.getSourceAsMap().get("job_name"));
platformJob.setJobCity((String) hit.getSourceAsMap().get("job_city"));
platformJob.setEducation(eduLevelMap.get((Integer) hit.getSourceAsMap().get("edu_level")));
platformJob.setLocations((String) hit.getSourceAsMap().get("locations"));
platformJob.setCompanyId((Integer) hit.getSourceAsMap().get("company_id"));
platformJob.setCareerJobId2((Integer) hit.getSourceAsMap().get("career_job_id2"));
platformJob.setSalary((String) hit.getSourceAsMap().get("salary"));
platformJobs.add(platformJob);
}

SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(TimeValue.timeValueMillis(scrollTimeout));
searchResponse = utEsClient.scroll(scrollRequest, RequestOptions.DEFAULT);
scrollId = searchResponse.getScrollId();
searchHits = searchResponse.getHits().getHits();
}

// 清除滚动上下文
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
utEsClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
log.info("es search error", e);
return Collections.emptyList();
}
return platformJobs;
}

public BoolQueryBuilder buildQuery(Integer recruitType) {
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
queryBuilder.filter(QueryBuilders.termQuery("content_type", IdType.PLATFORM_JOB.toString()));
queryBuilder.filter(QueryBuilders.termQuery("status", 0));
queryBuilder.filter(QueryBuilders.termQuery("recruit_type", recruitType));
return queryBuilder;
}
}